<!DOCTYPE html>



  


<html class="theme-next muse use-motion" lang="zh-Hans">
<head>
  <meta charset="UTF-8"/>
<meta http-equiv="X-UA-Compatible" content="IE=edge" />
<meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1"/>
<meta name="theme-color" content="#222">









<meta http-equiv="Cache-Control" content="no-transform" />
<meta http-equiv="Cache-Control" content="no-siteapp" />
















  
  
  <link href="/lib/fancybox/source/jquery.fancybox.css?v=2.1.5" rel="stylesheet" type="text/css" />







<link href="/lib/font-awesome/css/font-awesome.min.css?v=4.6.2" rel="stylesheet" type="text/css" />

<link href="/css/main.css?v=5.1.4" rel="stylesheet" type="text/css" />


  <link rel="apple-touch-icon" sizes="180x180" href="/images/apple-touch-icon-next.png?v=5.1.4">


  <link rel="icon" type="image/png" sizes="32x32" href="/images/favicon-32x32-next.png?v=5.1.4">


  <link rel="icon" type="image/png" sizes="16x16" href="/images/favicon-16x16-next.png?v=5.1.4">


  <link rel="mask-icon" href="/images/logo.svg?v=5.1.4" color="#222">





  <meta name="keywords" content="Hexo, NexT" />










<meta name="description" content="温馨提示：建议参考代码RocketMQ4.4版本，4.5版本引入了多副本机制，实现了主从自动切换，本文并不关心主从切换功能。  1、初识主从同步主从同步基本实现过程如下图所示：RocketMQ 的主从同步机制如下：A. 首先启动Master并在指定端口监听；B. 客户端启动，主动连接Master，建立TCP连接；C. 客户端以每隔5s的间隔时间向服务端拉取消息，如果是第一次拉取的话，先获取本地c">
<meta property="og:type" content="article">
<meta property="og:title" content="RocketMQ HA机制(主从同步)">
<meta property="og:url" content="https://www.codingw.net/posts/12eccc4e.html">
<meta property="og:site_name" content="中间件兴趣圈">
<meta property="og:description" content="温馨提示：建议参考代码RocketMQ4.4版本，4.5版本引入了多副本机制，实现了主从自动切换，本文并不关心主从切换功能。  1、初识主从同步主从同步基本实现过程如下图所示：RocketMQ 的主从同步机制如下：A. 首先启动Master并在指定端口监听；B. 客户端启动，主动连接Master，建立TCP连接；C. 客户端以每隔5s的间隔时间向服务端拉取消息，如果是第一次拉取的话，先获取本地c">
<meta property="og:locale">
<meta property="og:image" content="https://img-blog.csdnimg.cn/20190625233757881.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">
<meta property="og:image" content="https://img-blog.csdnimg.cn/20190625234245349.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">
<meta property="og:image" content="https://img-blog.csdnimg.cn/20190625234309172.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">
<meta property="og:image" content="https://img-blog.csdnimg.cn/20190625234502316.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">
<meta property="og:image" content="https://img-blog.csdnimg.cn/20190625234556971.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">
<meta property="article:published_time" content="2020-09-04T14:22:35.000Z">
<meta property="article:modified_time" content="2021-04-26T12:33:59.992Z">
<meta property="article:author" content="中间件兴趣圈">
<meta property="article:tag" content="中间件">
<meta name="twitter:card" content="summary">
<meta name="twitter:image" content="https://img-blog.csdnimg.cn/20190625233757881.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">



<script type="text/javascript" id="hexo.configurations">
  var NexT = window.NexT || {};
  var CONFIG = {
    root: '',
    scheme: 'Muse',
    version: '5.1.4',
    sidebar: {"position":"left","display":"post","offset":12,"b2t":false,"scrollpercent":false,"onmobile":false},
    fancybox: true,
    tabs: true,
    motion: {"enable":true,"async":false,"transition":{"post_block":"fadeIn","post_header":"slideDownIn","post_body":"slideDownIn","coll_header":"slideLeftIn","sidebar":"slideUpIn"}},
    duoshuo: {
      userId: '0',
      author: '博主'
    },
    algolia: {
      applicationID: '',
      apiKey: '',
      indexName: '',
      hits: {"per_page":10},
      labels: {"input_placeholder":"Search for Posts","hits_empty":"We didn't find any results for the search: ${query}","hits_stats":"${hits} results found in ${time} ms"}
    }
  };
</script>



  <link rel="canonical" href="https://www.codingw.net/posts/12eccc4e.html"/>





  <title>RocketMQ HA机制(主从同步) | 中间件兴趣圈</title>
  








<meta name="generator" content="Hexo 5.4.0"></head>

<body itemscope itemtype="http://schema.org/WebPage" lang="zh-Hans">

  
  
    
  

  <div class="container sidebar-position-left page-post-detail">
    <div class="headband"></div>

    <header id="header" class="header" itemscope itemtype="http://schema.org/WPHeader">
      <div class="header-inner"><div class="site-brand-wrapper">
  <div class="site-meta ">
    

    <div class="custom-logo-site-title">
      <a href="/"  class="brand" rel="start">
        <span class="logo-line-before"><i></i></span>
        <span class="site-title">中间件兴趣圈</span>
        <span class="logo-line-after"><i></i></span>
      </a>
    </div>
      
        <p class="site-subtitle">微信搜『中间件兴趣圈』，回复『Java』获取200本优质电子书</p>
      
  </div>

  <div class="site-nav-toggle">
    <button>
      <span class="btn-bar"></span>
      <span class="btn-bar"></span>
      <span class="btn-bar"></span>
    </button>
  </div>
</div>

<nav class="site-nav">
  

  
    <ul id="menu" class="menu">
      
        
        <li class="menu-item menu-item-home">
          <a href="/" rel="section">
            
              <i class="menu-item-icon fa fa-fw fa-home"></i> <br />
            
            首页
          </a>
        </li>
      
        
        <li class="menu-item menu-item-categories">
          <a href="/categories/" rel="section">
            
              <i class="menu-item-icon fa fa-fw fa-question-circle"></i> <br />
            
            分类
          </a>
        </li>
      
        
        <li class="menu-item menu-item-archives">
          <a href="/archives/" rel="section">
            
              <i class="menu-item-icon fa fa-fw fa-question-circle"></i> <br />
            
            归档
          </a>
        </li>
      

      
    </ul>
  

  
</nav>



 </div>
    </header>

    <main id="main" class="main">
      <div class="main-inner">
        <div class="content-wrap">
          <div id="content" class="content">
            

  <div id="posts" class="posts-expand">
    

  

  
  
  

  <article class="post post-type-normal" itemscope itemtype="http://schema.org/Article">
  
  
  
  <div class="post-block">
    <link itemprop="mainEntityOfPage" href="https://www.codingw.net/posts/12eccc4e.html">

    <span hidden itemprop="author" itemscope itemtype="http://schema.org/Person">
      <meta itemprop="name" content="">
      <meta itemprop="description" content="">
      <meta itemprop="image" content="/images/avatar.gif">
    </span>

    <span hidden itemprop="publisher" itemscope itemtype="http://schema.org/Organization">
      <meta itemprop="name" content="中间件兴趣圈">
    </span>

    
      <header class="post-header">

        
        
          <h1 class="post-title" itemprop="name headline">RocketMQ HA机制(主从同步)</h1>
        

        <div class="post-meta">
          <span class="post-time">
            
              <span class="post-meta-item-icon">
                <i class="fa fa-calendar-o"></i>
              </span>
              
                <span class="post-meta-item-text">发表于</span>
              
              <time title="创建于" itemprop="dateCreated datePublished" datetime="2020-09-04T22:22:35+08:00">
                2020-09-04
              </time>
            

            

            
          </span>

          
            <span class="post-category" >
            
              <span class="post-meta-divider">|</span>
            
              <span class="post-meta-item-icon">
                <i class="fa fa-folder-o"></i>
              </span>
              
                <span class="post-meta-item-text">分类于</span>
              
              
                <span itemprop="about" itemscope itemtype="http://schema.org/Thing">
                  <a href="/categories/rocketmq/" itemprop="url" rel="index">
                    <span itemprop="name">rocketmq</span>
                  </a>
                </span>

                
                
              
            </span>
          

          
            
          

          
          
             <span id="/posts/12eccc4e.html" class="leancloud_visitors" data-flag-title="RocketMQ HA机制(主从同步)">
               <span class="post-meta-divider">|</span>
               <span class="post-meta-item-icon">
                 <i class="fa fa-eye"></i>
               </span>
               
                 <span class="post-meta-item-text">阅读次数&#58;</span>
               
                 <span class="leancloud-visitors-count"></span>
             </span>
          

          
            <span class="post-meta-divider">|</span>
            <span class="page-pv"><i class="fa fa-file-o"></i>
            <span class="busuanzi-value" id="busuanzi_value_page_pv" ></span>次
            </span>
          

          

          

        </div>
      </header>
    

    
    
    
    <div class="post-body" itemprop="articleBody">

      
      

      
        <div id="vip-container"><blockquote>
<p>温馨提示：建议参考代码RocketMQ4.4版本，4.5版本引入了多副本机制，实现了主从自动切换，本文并不关心主从切换功能。</p>
</blockquote>
<h2 id="1、初识主从同步"><a href="#1、初识主从同步" class="headerlink" title="1、初识主从同步"></a>1、初识主从同步</h2><p>主从同步基本实现过程如下图所示：<br><img src="https://img-blog.csdnimg.cn/20190625233757881.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"><br>RocketMQ 的主从同步机制如下：<br>A. 首先启动Master并在指定端口监听；<br>B. 客户端启动，主动连接Master，建立TCP连接；<br>C. 客户端以每隔5s的间隔时间向服务端拉取消息，如果是第一次拉取的话，先获取本地commitlog文件中最大的偏移量，以该偏移量向服务端拉取消息；<br>D. 服务端解析请求，并返回一批数据给客户端；<br>E. 客户端收到一批消息后，将消息写入本地commitlog文件中，然后向Master汇报拉取进度，并更新下一次待拉取偏移量；<br>F. 然后重复第3步；</p>
<p>RocketMQ主从同步一个重要的特征：主从同步不具备主从切换功能，即当主节点宕机后，从不会接管消息发送，但可以提供消息读取。</p>
<blockquote>
<p>温馨提示：本文并不会详细分析RocketMQ主从同步的实现细节，如大家对其感兴趣，可以查阅笔者所著的《RocketMQ技术内幕》或查看笔者博文：<a target="_blank" rel="noopener" href="https://blog.csdn.net/prestigeding/article/details/79600792">https://blog.csdn.net/prestigeding/article/details/79600792</a></p>
</blockquote>
<h2 id="2、提出问题"><a href="#2、提出问题" class="headerlink" title="2、提出问题"></a>2、提出问题</h2><ul>
<li>主，从服务器都在运行过程中，消息消费者是从主拉取消息还是从从拉取？</li>
<li>RocketMQ主从同步架构中，如果主服务器宕机，从服务器会接管消息消费，此时消息消费进度如何保持，当主服务器恢复后，消息消费者是从主拉取消息还是从从服务器拉取，主从服务器之间的消息消费进度如何同步？</li>
</ul>
<p>接下来带着上述问题，一起来探究其实现原理。</p>
<h2 id="3、原理探究"><a href="#3、原理探究" class="headerlink" title="3、原理探究"></a>3、原理探究</h2><h3 id="3-1-RocketMQ主从读写分离机制"><a href="#3-1-RocketMQ主从读写分离机制" class="headerlink" title="3.1 RocketMQ主从读写分离机制"></a>3.1 RocketMQ主从读写分离机制</h3><p>RocketMQ的主从同步，在默认情况下RocketMQ会优先选择从主服务器进行拉取消息，并不是通常意义的上的读写分离，那什么时候会从拉取呢？</p>
<blockquote>
<p>温馨提示：本节同样不会详细整个流程，只会点出其关键点，如果想详细了解消息拉取、消息消费等核心流程，建议大家查阅笔者所著的《RocketMQ技术内幕》。</p>
</blockquote>
<p>在RocketMQ中判断是从主拉取，还是从从拉取的核心代码如下：<br>DefaultMessageStore#getMessage</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">long</span> diff = maxOffsetPy - maxPhyOffsetPulling;  <span class="comment">// @1</span></span><br><span class="line"><span class="keyword">long</span> memory = (<span class="keyword">long</span>) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE</span><br><span class="line">                            * (<span class="keyword">this</span>.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / <span class="number">100.0</span>));  <span class="comment">// @2</span></span><br><span class="line">getResult.setSuggestPullingFromSlave(diff &gt; memory);   <span class="comment">// @3</span></span><br></pre></td></tr></table></figure>
<p>代码@1：首先介绍一下几个局部变量的含义：</p>
<ul>
<li>maxOffsetPy<br>当前最大的物理偏移量。返回的偏移量为已存入到操作系统的PageCache中的内容。</li>
<li>maxPhyOffsetPulling<br>本次消息拉取最大物理偏移量，按照消息顺序拉取的基本原则，可以基本预测下次开始拉取的物理偏移量将大于该值，并且就在其附近。</li>
<li>diff<br>maxOffsetPy与maxPhyOffsetPulling之间的间隔，getMessage通常用于消息消费时，即这个间隔可以理解为目前未处理的消息总大小。</li>
</ul>
<p>代码@2：获取RocketMQ消息存储在PageCache中的总大小，如果当RocketMQ容量超过该阔值，将会将被置换出内存，如果要访问不在PageCache中的消息，则需要从磁盘读取。</p>
<ul>
<li>StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE<br>返回当前系统的总物理内存。参数</li>
<li>accessMessageInMemoryMaxRatio<br>设置消息存储在内存中的阀值，默认为40。<br>结合代码@2这两个参数的含义，算出RocketMQ消息能映射到内存中最大值为40% * (机器物理内存)。</li>
</ul>
<p>代码@3：设置下次拉起是否从从拉取标记，触发下次从从服务器拉取的条件为：当前所有可用消息数据(所有commitlog)文件的大小已经超过了其阔值，默认为物理内存的40%。</p>
<span id="more"></span>

<p>那GetResult的suggestPullingFromSlave属性在哪里使用呢？</p>
<p>PullMessageProcessor#processRequest</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">if</span> (getMessageResult.isSuggestPullingFromSlave()) &#123;      <span class="comment">// @1</span></span><br><span class="line">responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());</span><br><span class="line">&#125; <span class="keyword">else</span> &#123;</span><br><span class="line">       responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);</span><br><span class="line">&#125;</span><br><span class="line"><span class="keyword">switch</span> (<span class="keyword">this</span>.brokerController.getMessageStoreConfig().getBrokerRole()) &#123;      <span class="comment">// @2</span></span><br><span class="line">       <span class="keyword">case</span> ASYNC_MASTER:</span><br><span class="line">       <span class="keyword">case</span> SYNC_MASTER:</span><br><span class="line">               <span class="keyword">break</span>;</span><br><span class="line">       <span class="keyword">case</span> SLAVE:</span><br><span class="line">               <span class="keyword">if</span> (!<span class="keyword">this</span>.brokerController.getBrokerConfig().isSlaveReadEnable()) &#123;</span><br><span class="line">                        response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);</span><br><span class="line">                        responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);</span><br><span class="line">               &#125;</span><br><span class="line">              <span class="keyword">break</span>;</span><br><span class="line"> &#125; </span><br><span class="line"> </span><br><span class="line"><span class="keyword">if</span> (<span class="keyword">this</span>.brokerController.getBrokerConfig().isSlaveReadEnable()) &#123; <span class="comment">// @3</span></span><br><span class="line">            <span class="comment">// consume too slow ,redirect to another machine</span></span><br><span class="line">            <span class="keyword">if</span> (getMessageResult.isSuggestPullingFromSlave()) &#123;</span><br><span class="line">                 responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());</span><br><span class="line">            &#125;</span><br><span class="line">           <span class="comment">// consume ok</span></span><br><span class="line">           <span class="keyword">else</span> &#123;</span><br><span class="line">                responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());</span><br><span class="line">           &#125;</span><br><span class="line">     &#125; <span class="keyword">else</span> &#123;</span><br><span class="line">           responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);</span><br><span class="line">     &#125;</span><br></pre></td></tr></table></figure>
<p>代码@1：如果从commitlog文件查找消息时，发现消息堆积太多，默认超过物理内存的40%后，会建议从从服务器读取。</p>
<p>代码@2：如果当前服务器的角色为从服务器:并且slaveReadEnable=true，则忽略代码@1设置的值，下次拉取切换为从主拉取。</p>
<p>代码@3：如果slaveReadEnable=true(从允许读)，并且建议从从服务器读取，则从消息消费组建议当消息消费缓慢时建议的拉取brokerId，由订阅组配置属性whichBrokerWhenConsumeSlowly决定；如果消息消费速度正常，则使用订阅组建议的brokerId拉取消息进行消费，默认为主服务器。如果不允许从可读，则固定使用从主拉取。</p>
<blockquote>
<p>温馨提示：请注意broker服务参数slaveReadEnable，与订阅组配置信息：whichBrokerWhenConsumeSlowly、brokerId的值，在生产环境中，可以通过updateSubGroup命令动态改变订阅组的配置信息。</p>
</blockquote>
<p>如果订阅组的配置保持默认值的话，拉取消息请求发送到从服务器后，下一次消息拉取，无论是否开启slaveReadEnable，下一次拉取，还是会发往主服务器。</p>
<p>上面的步骤，在消息拉取命令的返回字段中，会将下次建议拉取Broker返回给客户端，根据其值从指定的broker拉取。</p>
<p>消息拉取实现PullAPIWrapper在处理拉取结果时会将服务端建议的brokerId更新到broker拉取缓存表中。<br><img src="https://img-blog.csdnimg.cn/20190625234245349.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"><br>在发起拉取请求之前，首先根据如下代码，选择待拉取消息的Broker。<br><img src="https://img-blog.csdnimg.cn/20190625234309172.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"></p>
<h3 id="3-2-消息消费进度同步机制"><a href="#3-2-消息消费进度同步机制" class="headerlink" title="3.2 消息消费进度同步机制"></a>3.2 消息消费进度同步机制</h3><p>从上面内容可知，主从同步引入的主要目的就是消息堆积的内容默认超过物理内存的40%，则消息读取则由从服务器来接管，实现消息的读写分离，避免主服务IO抖动严重。那问题来了，主服务器宕机后，从服务器接管消息消费后，那消息消费进度存储在哪里？当主服务器恢复正常后，消息是从主服务器拉取还是从从服务器拉取？主服务器如何得知最新的消息消费进度呢？</p>
<p>RocketMQ消息消费进度管理（集群模式）：<br>集群模式下消息消费进度存储文件位于服务端${ROCKETMQ_HOME}/store/config/consumerOffset.json。消息消费者从服务器拉取一批消息后提交到消费组特定的线程池中处理消息，当消息消费成功后会向Broker发送ACK消息，告知消费端已成功消费到哪条消息，Broker收到消息消费进度反馈后，首先存储在内存中，然后定时持久化到consumeOffset.json文件中。备注：关于消息消费进度管理更多的实现细节，建议查阅笔者所著的《RocketMQ技术内幕》。</p>
<p>我们先看一下客户端向服务端反馈消息消费进度时如何选择Broker。<br>因为主服务的brokerId为0，默认情况下当主服务器存活的时候，优先会选择主服务器，只有当主服务器宕机的情况下，才会选择从服务器。</p>
<p>既然集群模式下消息消费进度存储在Broker端，当主服务器正常时，消息消费进度文件存储在主服务器，那提出如下两个问题：<br>1）消息消费端在主服务器存活的情况下，会优先向主服务器反馈消息消费进度，那从服务器是如何同步消息消费进度的。<br>2）当主服务器宕机后则消息消费端会向从服务器反馈消息消费进度，此时消息消费进度如何存储，当主服务器恢复正常后，主服务器如何得知最新的消息消费进度。</p>
<p>为了解开上述两个疑问，我们优先来看一下Broker服务器在收到提交消息消费进度反馈命令后的处理逻辑：</p>
<p>客户端定时向Broker端发送更新消息消费进度的请求，其入口为：RemoteBrokerOffsetStore#updateConsumeOffsetToBroker，该方法中一个非常关键的点是：选择broker的逻辑，如下所示：<br><img src="https://img-blog.csdnimg.cn/20190625234502316.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"><br>如果主服务器存活，则选择主服务器，如果主服务器宕机，则选择从服务器。也就是说，不管消息是从主服务器拉取的还是从从服务器拉取的，提交消息消费进度请求，优先选择主服务器。服务端就是接收其偏移量，更新到服务端的内存中，然后定时持久化到${ROCKETMQ_HOME}/store/config/consumerOffset.json。</p>
<p>经过上面的分析，我们来讨论一下这个场景：<br>消息消费者首先从主服务器拉取消息，并向其提交消息消费进度，如果当主服务器宕机后，从服务器会接管消息拉取服务，此时消息消费进度存储在从服务器，主从服务器的消息消费进度会出现不一致？那当主服务器恢复正常后，两者之间的消息消费进度如何同步？</p>
<h5 id="3-2-1-从服务定时同步主服务器进度"><a href="#3-2-1-从服务定时同步主服务器进度" class="headerlink" title="3.2.1 从服务定时同步主服务器进度"></a>3.2.1 从服务定时同步主服务器进度</h5><p><img src="https://img-blog.csdnimg.cn/20190625234556971.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"><br>如果Broker角色为从服务器，会通过定时任务调用syncAll，从主服务器定时同步topic路由信息、消息消费进度、延迟队列处理进度、消费组订阅信息。</p>
<p>那问题来了，如果主服务器启动后，从服务器马上从主服务器同步消息消息进度，那岂不是又要重新消费？</p>
<p>其实在绝大部分情况下，就算从服务从主服务器同步了很久之前的消费进度，只要消息者没有重新启动，就不需要重新消费，在这种情况下，RocketMQ提供了两种机制来确保不丢失消息消费进度。</p>
<p>第一种，消息消费者在内存中存在最新的消息消费进度，继续以该进度去服务器拉取消息后，消息处理完后，会定时向Broker服务器反馈消息消费进度，在上面也提到过，在反馈消息消费进度时，会优先选择主服务器，此时主服务器的消息消费进度就立马更新了，从服务器此时只需定时同步主服务器的消息消费进度即可。</p>
<p>第二种是，消息消费者在向主服务器拉取消息时，如果是是主服务器，在处理消息拉取时，也会更新消息消费进度。</p>
<h5 id="3-2-2-主服务器消息拉取时更新消息消费进度"><a href="#3-2-2-主服务器消息拉取时更新消息消费进度" class="headerlink" title="3.2.2 主服务器消息拉取时更新消息消费进度"></a>3.2.2 主服务器消息拉取时更新消息消费进度</h5><p>主服务器在处理消息拉取命令时，会触发消息消费进度的更新，其代码入口为：PullMessageProcessor#processRequest</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">boolean</span> storeOffsetEnable = brokerAllowSuspend;  <span class="comment">// @1</span></span><br><span class="line">storeOffsetEnable = storeOffsetEnable &amp;&amp; hasCommitOffsetFlag; </span><br><span class="line">storeOffsetEnable = storeOffsetEnable</span><br><span class="line">            &amp;&amp; <span class="keyword">this</span>.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;  <span class="comment">// @2</span></span><br><span class="line"><span class="keyword">if</span> (storeOffsetEnable) &#123;</span><br><span class="line">            <span class="keyword">this</span>.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),</span><br><span class="line">                requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());</span><br><span class="line"> &#125;</span><br></pre></td></tr></table></figure>
<p>代码@1：首先介绍几个局部变量的含义：</p>
<ul>
<li>brokerAllowSuspend：broker是否允许挂起，在消息拉取时，该值默认为true。</li>
<li>hasCommitOffsetFlag：消息消费者在内存中是否缓存了消息消费进度，如果缓存了，该标记设置为true。<br>如果Broker的角色为主服务器，并且上面两个变量都为true，则首先使用commitOffset更新消息消费进度。</li>
</ul>
<p>看到这里，主从同步消息消费进度的相关问题，应该就有了答案了。</p>
<h2 id="4、总结"><a href="#4、总结" class="headerlink" title="4、总结"></a>4、总结</h2><p>上述实现原理的讲解有点枯燥无味，我们先来回答如下几个问题：</p>
<p>1、主，从服务器都在运行过程中，消息消费者是从主拉取消息还是从从拉取？<br>答：默认情况下，RocketMQ消息消费者从主服务器拉取，当主服务器积压的消息超过了物理内存的40%，则建议从从服务器拉取。但如果slaveReadEnable为false，表示从服务器不可读，从服务器也不会接管消息拉取。</p>
<p>2、当消息消费者向从服务器拉取消息后，会一直从从服务器拉取？<br>答：不是的。分如下情况：<br>1）如果从服务器的slaveReadEnable设置为false，则下次拉取，从主服务器拉取。<br>2）如果从服务器允许读取并且从服务器积压的消息未超过其物理内存的40%，下次拉取使用的Broker为订阅组的brokerId指定的Broker服务器，该值默认为0，代表主服务器。<br>3）如果从服务器允许读取并且从服务器积压的消息超过了其物理内存的40%，下次拉取使用的Broker为订阅组的whichBrokerWhenConsumeSlowly指定的Broker服务器，该值默认为1，代表从服务器。</p>
<p>3、主从服务消息消费进是如何同步的？<br>答：消息消费进度的同步时单向的，从服务器开启一个定时任务，定时从主服务器同步消息消费进度；无论消息消费者是从主服务器拉的消息还是从从服务器拉取的消息，在向Broker反馈消息消费进度时，优先向主服务器汇报；消息消费者向主服务器拉取消息时，如果消息消费者内存中存在消息消费进度时，主会尝试跟新消息消费进度。</p>
<p>读写分离的正确使用姿势：<br>1、主从Broker服务器的slaveReadEnable设置为true。<br>2、通过updateSubGroup命令更新消息组whichBrokerWhenConsumeSlowly、brokerId，特别是其brokerId不要设置为0，不然从从服务器拉取一次后，下一次拉取就会从主去拉取。</p>
</div>

			<script src="https://my.openwrite.cn/js/readmore.js" type="text/javascript"></script>
			<script>
			var isMobile = navigator.userAgent.match(/(phone|pad|pod|iPhone|iPod|ios|iPad|Android|Mobile|BlackBerry|IEMobile|MQQBrowser|JUC|Fennec|wOSBrowser|BrowserNG|WebOS|Symbian|Windows Phone)/i);
			if (!isMobile) {
			    var btw = new BTWPlugin();
			    btw.init({
			        "id": "vip-container",
			        "blogId": "18019-1573088808868-542",
			        "name": "中间件兴趣圈",
			        "qrcode": "https://img-blog.csdnimg.cn/20190314214003962.jpg",
			        "keyword": "more"
			    });
			}
			</script>
		
      
    </div>
    
    
    

    

    

    

    <footer class="post-footer">
      

      
      
      

      
        <div class="post-nav">
          <div class="post-nav-next post-nav-item">
            
              <a href="/posts/d198d6eb.html" rel="next" title="RocketMQ 多副本前置篇：初探raft协议">
                <i class="fa fa-chevron-left"></i> RocketMQ 多副本前置篇：初探raft协议
              </a>
            
          </div>

          <span class="post-nav-divider"></span>

          <div class="post-nav-prev post-nav-item">
            
              <a href="/posts/7b95946e.html" rel="prev" title="RocketMQ ACL 使用指南">
                RocketMQ ACL 使用指南 <i class="fa fa-chevron-right"></i>
              </a>
            
          </div>
        </div>
      

      
      
    </footer>
  </div>
  
  
  
  </article>



    <div class="post-spread">
      
    </div>
  </div>


          </div>
          


          

  



        </div>
        
          
  
  <div class="sidebar-toggle">
    <div class="sidebar-toggle-line-wrap">
      <span class="sidebar-toggle-line sidebar-toggle-line-first"></span>
      <span class="sidebar-toggle-line sidebar-toggle-line-middle"></span>
      <span class="sidebar-toggle-line sidebar-toggle-line-last"></span>
    </div>
  </div>

  <aside id="sidebar" class="sidebar">
    
    <div class="sidebar-inner">

      

      
        <ul class="sidebar-nav motion-element">
          <li class="sidebar-nav-toc sidebar-nav-active" data-target="post-toc-wrap">
            文章目录
          </li>
          <li class="sidebar-nav-overview" data-target="site-overview-wrap">
            站点概览
          </li>
        </ul>
      

      <section class="site-overview-wrap sidebar-panel">
        <div class="site-overview">
          <div class="site-author motion-element" itemprop="author" itemscope itemtype="http://schema.org/Person">
            
              <p class="site-author-name" itemprop="name"></p>
              <p class="site-description motion-element" itemprop="description"></p>
          </div>

          <nav class="site-state motion-element">

            
              <div class="site-state-item site-state-posts">
              
                <a href="/archives/">
              
                  <span class="site-state-item-count">139</span>
                  <span class="site-state-item-name">日志</span>
                </a>
              </div>
            

            
              
              
              <div class="site-state-item site-state-categories">
                <a href="/categories/index.html">
                  <span class="site-state-item-count">18</span>
                  <span class="site-state-item-name">分类</span>
                </a>
              </div>
            

            

          </nav>

          

          

          
          

          
          

          

        </div>
      </section>

      
      <!--noindex-->
        <section class="post-toc-wrap motion-element sidebar-panel sidebar-panel-active">
          <div class="post-toc">

            
              
            

            
              <div class="post-toc-content"><ol class="nav"><li class="nav-item nav-level-2"><a class="nav-link" href="#1%E3%80%81%E5%88%9D%E8%AF%86%E4%B8%BB%E4%BB%8E%E5%90%8C%E6%AD%A5"><span class="nav-number">1.</span> <span class="nav-text">1、初识主从同步</span></a></li><li class="nav-item nav-level-2"><a class="nav-link" href="#2%E3%80%81%E6%8F%90%E5%87%BA%E9%97%AE%E9%A2%98"><span class="nav-number">2.</span> <span class="nav-text">2、提出问题</span></a></li><li class="nav-item nav-level-2"><a class="nav-link" href="#3%E3%80%81%E5%8E%9F%E7%90%86%E6%8E%A2%E7%A9%B6"><span class="nav-number">3.</span> <span class="nav-text">3、原理探究</span></a><ol class="nav-child"><li class="nav-item nav-level-3"><a class="nav-link" href="#3-1-RocketMQ%E4%B8%BB%E4%BB%8E%E8%AF%BB%E5%86%99%E5%88%86%E7%A6%BB%E6%9C%BA%E5%88%B6"><span class="nav-number">3.1.</span> <span class="nav-text">3.1 RocketMQ主从读写分离机制</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#3-2-%E6%B6%88%E6%81%AF%E6%B6%88%E8%B4%B9%E8%BF%9B%E5%BA%A6%E5%90%8C%E6%AD%A5%E6%9C%BA%E5%88%B6"><span class="nav-number">3.2.</span> <span class="nav-text">3.2 消息消费进度同步机制</span></a><ol class="nav-child"><li class="nav-item nav-level-5"><a class="nav-link" href="#3-2-1-%E4%BB%8E%E6%9C%8D%E5%8A%A1%E5%AE%9A%E6%97%B6%E5%90%8C%E6%AD%A5%E4%B8%BB%E6%9C%8D%E5%8A%A1%E5%99%A8%E8%BF%9B%E5%BA%A6"><span class="nav-number">3.2.0.1.</span> <span class="nav-text">3.2.1 从服务定时同步主服务器进度</span></a></li><li class="nav-item nav-level-5"><a class="nav-link" href="#3-2-2-%E4%B8%BB%E6%9C%8D%E5%8A%A1%E5%99%A8%E6%B6%88%E6%81%AF%E6%8B%89%E5%8F%96%E6%97%B6%E6%9B%B4%E6%96%B0%E6%B6%88%E6%81%AF%E6%B6%88%E8%B4%B9%E8%BF%9B%E5%BA%A6"><span class="nav-number">3.2.0.2.</span> <span class="nav-text">3.2.2 主服务器消息拉取时更新消息消费进度</span></a></li></ol></li></ol></li></ol></li><li class="nav-item nav-level-2"><a class="nav-link" href="#4%E3%80%81%E6%80%BB%E7%BB%93"><span class="nav-number">4.</span> <span class="nav-text">4、总结</span></a></li></ol></div>
            

          </div>
        </section>
      <!--/noindex-->
      

      

    </div>
  </aside>


        
      </div>
    </main>

    <footer id="footer" class="footer">
      <div class="footer-inner">
        <div class="copyright">&copy; <span itemprop="copyrightYear">2021</span>
  <span class="with-love">
    <i class="fa fa-user"></i>
  </span>
  <span class="author" itemprop="copyrightHolder">中间件兴趣圈</span>

  
</div>


  <div class="powered-by">由 <a class="theme-link" target="_blank" href="https://hexo.io">Hexo</a> 强力驱动</div>



  <span class="post-meta-divider">|</span>



  <div class="theme-info">主题 &mdash; <a class="theme-link" target="_blank" href="https://github.com/iissnan/hexo-theme-next">NexT.Muse</a> v5.1.4</div>




        
<div class="busuanzi-count">
  <script async src="https://dn-lbstatics.qbox.me/busuanzi/2.3/busuanzi.pure.mini.js"></script>

  
    <span class="site-uv">
      <i class="fa fa-user"></i>
      <span class="busuanzi-value" id="busuanzi_value_site_uv"></span>
      
    </span>
  

  
    <span class="site-pv">
      <i class="fa fa-eye"></i>
      <span class="busuanzi-value" id="busuanzi_value_site_pv"></span>
      
    </span>
  
</div>








        
      </div>
    </footer>

    
      <div class="back-to-top">
        <i class="fa fa-arrow-up"></i>
        
      </div>
    

    

  </div>

  

<script type="text/javascript">
  if (Object.prototype.toString.call(window.Promise) !== '[object Function]') {
    window.Promise = null;
  }
</script>









  












  
  
    <script type="text/javascript" src="/lib/jquery/index.js?v=2.1.3"></script>
  

  
  
    <script type="text/javascript" src="/lib/fastclick/lib/fastclick.min.js?v=1.0.6"></script>
  

  
  
    <script type="text/javascript" src="/lib/jquery_lazyload/jquery.lazyload.js?v=1.9.7"></script>
  

  
  
    <script type="text/javascript" src="/lib/velocity/velocity.min.js?v=1.2.1"></script>
  

  
  
    <script type="text/javascript" src="/lib/velocity/velocity.ui.min.js?v=1.2.1"></script>
  

  
  
    <script type="text/javascript" src="/lib/fancybox/source/jquery.fancybox.pack.js?v=2.1.5"></script>
  


  


  <script type="text/javascript" src="/js/src/utils.js?v=5.1.4"></script>

  <script type="text/javascript" src="/js/src/motion.js?v=5.1.4"></script>



  
  

  
  <script type="text/javascript" src="/js/src/scrollspy.js?v=5.1.4"></script>
<script type="text/javascript" src="/js/src/post-details.js?v=5.1.4"></script>



  


  <script type="text/javascript" src="/js/src/bootstrap.js?v=5.1.4"></script>



  


  




	





  





  












  





  

  
  <script src="https://cdn1.lncld.net/static/js/av-core-mini-0.6.4.js"></script>
  <script>AV.initialize("NNEhOL0iOcflg8f1U3HUqiCq-gzGzoHsz", "7kSmkbbb3DktmHALlShDsBUF");</script>
  <script>
    function showTime(Counter) {
      var query = new AV.Query(Counter);
      var entries = [];
      var $visitors = $(".leancloud_visitors");

      $visitors.each(function () {
        entries.push( $(this).attr("id").trim() );
      });

      query.containedIn('url', entries);
      query.find()
        .done(function (results) {
          var COUNT_CONTAINER_REF = '.leancloud-visitors-count';

          if (results.length === 0) {
            $visitors.find(COUNT_CONTAINER_REF).text(0);
            return;
          }

          for (var i = 0; i < results.length; i++) {
            var item = results[i];
            var url = item.get('url');
            var time = item.get('time');
            var element = document.getElementById(url);

            $(element).find(COUNT_CONTAINER_REF).text(time);
          }
          for(var i = 0; i < entries.length; i++) {
            var url = entries[i];
            var element = document.getElementById(url);
            var countSpan = $(element).find(COUNT_CONTAINER_REF);
            if( countSpan.text() == '') {
              countSpan.text(0);
            }
          }
        })
        .fail(function (object, error) {
          console.log("Error: " + error.code + " " + error.message);
        });
    }

    function addCount(Counter) {
      var $visitors = $(".leancloud_visitors");
      var url = $visitors.attr('id').trim();
      var title = $visitors.attr('data-flag-title').trim();
      var query = new AV.Query(Counter);

      query.equalTo("url", url);
      query.find({
        success: function(results) {
          if (results.length > 0) {
            var counter = results[0];
            counter.fetchWhenSave(true);
            counter.increment("time");
            counter.save(null, {
              success: function(counter) {
                var $element = $(document.getElementById(url));
                $element.find('.leancloud-visitors-count').text(counter.get('time'));
              },
              error: function(counter, error) {
                console.log('Failed to save Visitor num, with error message: ' + error.message);
              }
            });
          } else {
            var newcounter = new Counter();
            /* Set ACL */
            var acl = new AV.ACL();
            acl.setPublicReadAccess(true);
            acl.setPublicWriteAccess(true);
            newcounter.setACL(acl);
            /* End Set ACL */
            newcounter.set("title", title);
            newcounter.set("url", url);
            newcounter.set("time", 1);
            newcounter.save(null, {
              success: function(newcounter) {
                var $element = $(document.getElementById(url));
                $element.find('.leancloud-visitors-count').text(newcounter.get('time'));
              },
              error: function(newcounter, error) {
                console.log('Failed to create');
              }
            });
          }
        },
        error: function(error) {
          console.log('Error:' + error.code + " " + error.message);
        }
      });
    }

    $(function() {
      var Counter = AV.Object.extend("Counter");
      if ($('.leancloud_visitors').length == 1) {
        addCount(Counter);
      } else if ($('.post-title-link').length > 1) {
        showTime(Counter);
      }
    });
  </script>



  

  

  
  

  

  

  

</body>
</html>
